package com.atguigu.flink.datastreamapi.sink;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * Created by Smexy on 2023/11/13
 *

 JdbcSink.exactlyOnceSink(): 在1.14之后推出的。由Flink在内部保证写出的精确一次。
        借助分布式事务机制。分布式事务又需要依赖数据库的官方驱动支持。
        目前，使用这种方式，非法麻烦，不成熟。

 JdbcSink.sink(): 是at least once。
        但是可以通过写出的sql语句的幂等机制，保证输出的精确一次。
            at least once + 语句是幂等 = exactlyOnce。

        幂等： 执行1次和执行N次的效果一样。

        redis:  一些api支持，例如 set k v
                有一些不支持的，例如  incr k

        hbase:  支持。
                    put t1,1001,f1:name,jack

        mysql:  某些写入。
                    insert into  不支持
                    insert into on duplicate update 支持
                    replace into   支持

 */
public class Demo4_JDBCSink
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /*
            构造JDBCSink
            SinkFunction<T> sink(
            String sql,   sql语句
            JdbcStatementBuilder<T> statementBuilder,  预编译sql，填充占位符
            JdbcExecutionOptions executionOptions,     设置写出的一些参数
            JdbcConnectionOptions connectionOptions   设置连接的参数
            )
         */
        SinkFunction<WaterSensor> sink = JdbcSink
            .<WaterSensor>sink(
                "REPLACE INTO `ws` VALUES(?,?,?)",
                new JdbcStatementBuilder<WaterSensor>()
                {
                    @Override
                    public void accept(PreparedStatement ps, WaterSensor waterSensor) throws SQLException {
                        //索引从1开始
                        ps.setString(1, waterSensor.getId());
                        ps.setLong(2, waterSensor.getTs());
                        ps.setInt(3, waterSensor.getVc());
                    }
                },
                JdbcExecutionOptions.builder()
                                    .withBatchIntervalMs(1000) //不满足一批，参考时间间隔
                                    .withMaxRetries(3)  //设置写失败，重试的最大次数
                                    .withBatchSize(100)   //一批写出的数据条数
                                    .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withUrl("jdbc:mysql://hadoop102:3306/Mybatis?useSSL=false&useUnicode=true&characterEncoding=UTF-8")
                    .withUsername("root")
                    .withPassword("000000")
                    .withDriverName("com.mysql.cj.jdbc.Driver")
                    .build()
            );


        //每一种传感器的最大水位记录在mysql数据库中
                env
                   .socketTextStream("hadoop102", 8888)
                   .map(new WaterSensorMapFunction())
                    .addSink(sink);


                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

    }
}
